package cn.kai.ymcc.mq.consumer;

import cn.kai.ymcc.domain.PayOrder;
import cn.kai.ymcc.dto.PayDto;
import cn.kai.ymcc.service.IPayOrderService;
import cn.kai.ymcc.util.AssertUtil;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Date;

/**
 * 课程订单生成支付单的消费者
 *   一个消息只能生成一个支付单，所以只能被一个消费者消费
 *   那么消费者只能使用 集群模式  --- 默认可以不写
 *   （注意：如果一个消息要被多个消费者消费的话，那么请使用广播模式）
 *
 */
@Component
@RocketMQMessageListener(
        consumerGroup="course-order-pay-consumer",
        topic="course-order-topic",
        selectorExpression="course-order-tag"
        //messageModel = MessageModel.CLUSTERING  默认是集群模式，一个消息只会被一个消费者消费
)
@Slf4j
public class CourserOrder2PayOrderConsumer implements RocketMQListener<MessageExt> {
    @Autowired
    private IPayOrderService payOrderService;

    //消费消息
    /**
     * =============必须保证消费方消费消息的幂等性============
     * 1.拿到消息，检查消息不为空
     * 2.根据拿到的订单号，查询payOrder表，校验订单号对应的支付单是否已经存在
     * 2.1 有，说明已经消费过，直接签收消息--后面Mq就不会在推送此消息了
     * 2.2 没有，说明是第一次来消费这个消息，保存支付单 payOrder
     *
     * 扩展：作为RocketMq，消费方是否签收消息是通过是否有异常来判断的
     *    onMessage方法体：
     *      1.如果有异常，签收失败 -- 此消息会滞留在Mq中，下次还会再被消费--直到MQ 自动清理垃圾
     *      2.如果没有异常，签收成功  -- 消息就不再会被再次消费
     *
     *
     *
     *
     * @param messageExt
     */
    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        //1.拿到消息，检查消息不为空
        AssertUtil.isNotNull(body,"消息体为空！！");//不签收消息
        String jsonStr = new String(body, StandardCharsets.UTF_8);
        PayDto payDto = JSONObject.parseObject(jsonStr, PayDto.class);
        //2.根据拿到的订单号，查询payOrder表，校验订单号对应的支付单是否已经存在
        //synchronized (payDto.getOrderNo()){
            PayOrder payOrder = payOrderService.selectByOrderNo(payDto.getOrderNo());
            if(payOrder != null){
                //2.1 有，说明已经消费过，直接签收消息--后面Mq就不会在推送此消息了
                log.info("订单已经存在，不再消费消息，直接签收！！");
                return;//不报错即签收
            }
            //2.2 没有，说明是第一次来消费这个消息，保存支付单 payOrder
            payOrder = new PayOrder();
            BeanUtils.copyProperties(payDto,payOrder);
            payOrder.setCreateTime(new Date());
            payOrder.setPayStatus(PayOrder.STATE_WAIT_PAY);
            payOrderService.insert(payOrder);
        //}

    }
}
